/*
 * Tencent is pleased to support the open source community by making TBase available.  
 * 
 * Copyright (C) 2019 Tencent.  All rights reserved.
 * 
 * TBase is licensed under the BSD 3-Clause License, except for the third-party component listed below. 
 * 
 * A copy of the BSD 3-Clause License is included in this file.
 * 
 * Other dependencies and licenses:
 * 
 * Open Source Software Licensed Under the PostgreSQL License: 
 * --------------------------------------------------------------------
 * 1. Postgres-XL XL9_5_STABLE
 * Portions Copyright (c) 2015-2016, 2ndQuadrant Ltd
 * Portions Copyright (c) 2012-2015, TransLattice, Inc.
 * Portions Copyright (c) 2010-2017, Postgres-XC Development Group
 * Portions Copyright (c) 1996-2015, The PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, The Regents of the University of California
 * 
 * Terms of the PostgreSQL License: 
 * --------------------------------------------------------------------
 * Permission to use, copy, modify, and distribute this software and its
 * documentation for any purpose, without fee, and without a written agreement
 * is hereby granted, provided that the above copyright notice and this
 * paragraph and the following two paragraphs appear in all copies.
 * 
 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
 * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
 * DOCUMENTATION, EVEN IF THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 * 
 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
 * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
 * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATIONS TO
 * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
 * 
 * 
 * Terms of the BSD 3-Clause License:
 * --------------------------------------------------------------------
 * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
 * 
 * 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
 * 
 * 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation 
 * and/or other materials provided with the distribution.
 * 
 * 3. Neither the name of Tencent nor the names of its contributors may be used to endorse or promote products derived from this software without 
 * specific prior written permission.
 * 
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, 
 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS 
 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE 
 * GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH 
 * DAMAGE.
 * 
 */
/*-------------------------------------------------------------------------
 *
 * freelist.c
 *      routines for managing the buffer pool's replacement strategy.
 *
 *
 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
 *      src/backend/storage/buffer/freelist.c
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

#include "port/atomics.h"
#include "storage/buf_internals.h"
#include "storage/bufmgr.h"
#include "storage/proc.h"

#define INT_ACCESS_ONCE(var)    ((int)(*((volatile int *)&(var))))


/*
 * The shared freelist control information.
 */
typedef struct
{
    /* Spinlock: protects the values below */
    slock_t        buffer_strategy_lock;

    /*
     * Clock sweep hand: index of next buffer to consider grabbing. Note that
     * this isn't a concrete buffer - we only ever increase the value. So, to
     * get an actual buffer, it needs to be used modulo NBuffers.
     */
    pg_atomic_uint32 nextVictimBuffer;

    int            firstFreeBuffer;    /* Head of list of unused buffers */
    int            lastFreeBuffer; /* Tail of list of unused buffers */

    /*
     * NOTE: lastFreeBuffer is undefined when firstFreeBuffer is -1 (that is,
     * when the list is empty)
     */

    /*
     * Statistics.  These counters should be wide enough that they can't
     * overflow during a single bgwriter cycle.
     */
    uint32        completePasses; /* Complete cycles of the clock sweep */
    pg_atomic_uint32 numBufferAllocs;    /* Buffers allocated since last reset */

    /*
     * Bgworker process to be notified upon activity or -1 if none. See
     * StrategyNotifyBgWriter.
     */
    int            bgwprocno;
} BufferStrategyControl;

/* Pointers to shared state */
static BufferStrategyControl *StrategyControl = NULL;

/*
 * Private (non-shared) state for managing a ring of shared buffers to re-use.
 * This is currently the only kind of BufferAccessStrategy object, but someday
 * we might have more kinds.
 */
typedef struct BufferAccessStrategyData
{
    /* Overall strategy type */
    BufferAccessStrategyType btype;
    /* Number of elements in buffers[] array */
    int            ring_size;

    /*
     * Index of the "current" slot in the ring, ie, the one most recently
     * returned by GetBufferFromRing.
     */
    int            current;

    /*
     * True if the buffer just returned by StrategyGetBuffer had been in the
     * ring already.
     */
    bool        current_was_in_ring;

    /*
     * Array of buffer numbers.  InvalidBuffer (that is, zero) indicates we
     * have not yet selected a buffer for this ring slot.  For allocation
     * simplicity this is palloc'd together with the fixed fields of the
     * struct.
     */
    Buffer        buffers[FLEXIBLE_ARRAY_MEMBER];
}            BufferAccessStrategyData;


/* Prototypes for internal functions */
static BufferDesc *GetBufferFromRing(BufferAccessStrategy strategy,
                  uint32 *buf_state);
static void AddBufferToRing(BufferAccessStrategy strategy,
                BufferDesc *buf);

/*
 * ClockSweepTick - Helper routine for StrategyGetBuffer()
 *
 * Move the clock hand one buffer ahead of its current position and return the
 * id of the buffer now under the hand.
 */
static inline uint32
ClockSweepTick(void)
{
    uint32        victim;

    /*
     * Atomically move hand ahead one buffer - if there's several processes
     * doing this, this can lead to buffers being returned slightly out of
     * apparent order.
     */
    victim =
        pg_atomic_fetch_add_u32(&StrategyControl->nextVictimBuffer, 1);

    if (victim >= NBuffers)
    {
        uint32        originalVictim = victim;

        /* always wrap what we look up in BufferDescriptors */
        victim = victim % NBuffers;

        /*
         * If we're the one that just caused a wraparound, force
         * completePasses to be incremented while holding the spinlock. We
         * need the spinlock so StrategySyncStart() can return a consistent
         * value consisting of nextVictimBuffer and completePasses.
         */
        if (victim == 0)
        {
            uint32        expected;
            uint32        wrapped;
            bool        success = false;

            expected = originalVictim + 1;

            while (!success)
            {
                /*
                 * Acquire the spinlock while increasing completePasses. That
                 * allows other readers to read nextVictimBuffer and
                 * completePasses in a consistent manner which is required for
                 * StrategySyncStart().  In theory delaying the increment
                 * could lead to an overflow of nextVictimBuffers, but that's
                 * highly unlikely and wouldn't be particularly harmful.
                 */
                SpinLockAcquire(&StrategyControl->buffer_strategy_lock);

                wrapped = expected % NBuffers;

                success = pg_atomic_compare_exchange_u32(&StrategyControl->nextVictimBuffer,
                                                         &expected, wrapped);
                if (success)
                    StrategyControl->completePasses++;
                SpinLockRelease(&StrategyControl->buffer_strategy_lock);
            }
        }
    }
    return victim;
}

/*
 * StrategyGetBuffer
 *
 *    Called by the bufmgr to get the next candidate buffer to use in
 *    BufferAlloc(). The only hard requirement BufferAlloc() has is that
 *    the selected buffer must not currently be pinned by anyone.
 *
 *    strategy is a BufferAccessStrategy object, or NULL for default strategy.
 *
 *    To ensure that no one else can pin the buffer before we do, we must
 *    return the buffer with the buffer header spinlock still held.
 */
BufferDesc *
StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state)
{// #lizard forgives
    BufferDesc *buf;
    int            bgwprocno;
    int            trycounter;
    uint32        local_buf_state;    /* to avoid repeated (de-)referencing */

    /*
     * If given a strategy object, see whether it can select a buffer. We
     * assume strategy objects don't need buffer_strategy_lock.
     */
    if (strategy != NULL)
    {
        buf = GetBufferFromRing(strategy, buf_state);
        if (buf != NULL)
            return buf;
    }

    /*
     * If asked, we need to waken the bgwriter. Since we don't want to rely on
     * a spinlock for this we force a read from shared memory once, and then
     * set the latch based on that value. We need to go through that length
     * because otherwise bgprocno might be reset while/after we check because
     * the compiler might just reread from memory.
     *
     * This can possibly set the latch of the wrong process if the bgwriter
     * dies in the wrong moment. But since PGPROC->procLatch is never
     * deallocated the worst consequence of that is that we set the latch of
     * some arbitrary process.
     */
    bgwprocno = INT_ACCESS_ONCE(StrategyControl->bgwprocno);
    if (bgwprocno != -1)
    {
        /* reset bgwprocno first, before setting the latch */
        StrategyControl->bgwprocno = -1;

        /*
         * Not acquiring ProcArrayLock here which is slightly icky. It's
         * actually fine because procLatch isn't ever freed, so we just can
         * potentially set the wrong process' (or no process') latch.
         */
        SetLatch(&ProcGlobal->allProcs[bgwprocno].procLatch);
    }

    /*
     * We count buffer allocation requests so that the bgwriter can estimate
     * the rate of buffer consumption.  Note that buffers recycled by a
     * strategy object are intentionally not counted here.
     */
    pg_atomic_fetch_add_u32(&StrategyControl->numBufferAllocs, 1);

    /*
     * First check, without acquiring the lock, whether there's buffers in the
     * freelist. Since we otherwise don't require the spinlock in every
     * StrategyGetBuffer() invocation, it'd be sad to acquire it here -
     * uselessly in most cases. That obviously leaves a race where a buffer is
     * put on the freelist but we don't see the store yet - but that's pretty
     * harmless, it'll just get used during the next buffer acquisition.
     *
     * If there's buffers on the freelist, acquire the spinlock to pop one
     * buffer of the freelist. Then check whether that buffer is usable and
     * repeat if not.
     *
     * Note that the freeNext fields are considered to be protected by the
     * buffer_strategy_lock not the individual buffer spinlocks, so it's OK to
     * manipulate them without holding the spinlock.
     */
    if (StrategyControl->firstFreeBuffer >= 0)
    {
        while (true)
        {
            /* Acquire the spinlock to remove element from the freelist */
            SpinLockAcquire(&StrategyControl->buffer_strategy_lock);

            if (StrategyControl->firstFreeBuffer < 0)
            {
                SpinLockRelease(&StrategyControl->buffer_strategy_lock);
                break;
            }

            buf = GetBufferDescriptor(StrategyControl->firstFreeBuffer);
            Assert(buf->freeNext != FREENEXT_NOT_IN_LIST);

            /* Unconditionally remove buffer from freelist */
            StrategyControl->firstFreeBuffer = buf->freeNext;
            buf->freeNext = FREENEXT_NOT_IN_LIST;

            /*
             * Release the lock so someone else can access the freelist while
             * we check out this buffer.
             */
            SpinLockRelease(&StrategyControl->buffer_strategy_lock);

            /*
             * If the buffer is pinned or has a nonzero usage_count, we cannot
             * use it; discard it and retry.  (This can only happen if VACUUM
             * put a valid buffer in the freelist and then someone else used
             * it before we got to it.  It's probably impossible altogether as
             * of 8.3, but we'd better check anyway.)
             */
            local_buf_state = LockBufHdr(buf);
            if (BUF_STATE_GET_REFCOUNT(local_buf_state) == 0
                && BUF_STATE_GET_USAGECOUNT(local_buf_state) == 0)
            {
                if (strategy != NULL)
                    AddBufferToRing(strategy, buf);
                *buf_state = local_buf_state;
                return buf;
            }
            UnlockBufHdr(buf, local_buf_state);

        }
    }

    /* Nothing on the freelist, so run the "clock sweep" algorithm */
    trycounter = NBuffers;
    for (;;)
    {
        buf = GetBufferDescriptor(ClockSweepTick());

        /*
         * If the buffer is pinned or has a nonzero usage_count, we cannot use
         * it; decrement the usage_count (unless pinned) and keep scanning.
         */
        local_buf_state = LockBufHdr(buf);

        if (BUF_STATE_GET_REFCOUNT(local_buf_state) == 0)
        {
            if (BUF_STATE_GET_USAGECOUNT(local_buf_state) != 0)
            {
                local_buf_state -= BUF_USAGECOUNT_ONE;

                trycounter = NBuffers;
            }
            else
            {
                /* Found a usable buffer */
                if (strategy != NULL)
                    AddBufferToRing(strategy, buf);
                *buf_state = local_buf_state;
                return buf;
            }
        }
        else if (--trycounter == 0)
        {
            /*
             * We've scanned all the buffers without making any state changes,
             * so all the buffers are pinned (or were when we looked at them).
             * We could hope that someone will free one eventually, but it's
             * probably better to fail than to risk getting stuck in an
             * infinite loop.
             */
            UnlockBufHdr(buf, local_buf_state);
            elog(ERROR, "no unpinned buffers available");
        }
        UnlockBufHdr(buf, local_buf_state);
    }
}

/*
 * StrategyFreeBuffer: put a buffer on the freelist
 */
void
StrategyFreeBuffer(BufferDesc *buf)
{
    SpinLockAcquire(&StrategyControl->buffer_strategy_lock);

    /*
     * It is possible that we are told to put something in the freelist that
     * is already in it; don't screw up the list if so.
     */
    if (buf->freeNext == FREENEXT_NOT_IN_LIST)
    {
        buf->freeNext = StrategyControl->firstFreeBuffer;
        if (buf->freeNext < 0)
            StrategyControl->lastFreeBuffer = buf->buf_id;
        StrategyControl->firstFreeBuffer = buf->buf_id;
    }

    SpinLockRelease(&StrategyControl->buffer_strategy_lock);
}

/*
 * StrategySyncStart -- tell BufferSync where to start syncing
 *
 * The result is the buffer index of the best buffer to sync first.
 * BufferSync() will proceed circularly around the buffer array from there.
 *
 * In addition, we return the completed-pass count (which is effectively
 * the higher-order bits of nextVictimBuffer) and the count of recent buffer
 * allocs if non-NULL pointers are passed.  The alloc count is reset after
 * being read.
 */
int
StrategySyncStart(uint32 *complete_passes, uint32 *num_buf_alloc)
{
    uint32        nextVictimBuffer;
    int            result;

    SpinLockAcquire(&StrategyControl->buffer_strategy_lock);
    nextVictimBuffer = pg_atomic_read_u32(&StrategyControl->nextVictimBuffer);
    result = nextVictimBuffer % NBuffers;

    if (complete_passes)
    {
        *complete_passes = StrategyControl->completePasses;

        /*
         * Additionally add the number of wraparounds that happened before
         * completePasses could be incremented. C.f. ClockSweepTick().
         */
        *complete_passes += nextVictimBuffer / NBuffers;
    }

    if (num_buf_alloc)
    {
        *num_buf_alloc = pg_atomic_exchange_u32(&StrategyControl->numBufferAllocs, 0);
    }
    SpinLockRelease(&StrategyControl->buffer_strategy_lock);
    return result;
}

/*
 * StrategyNotifyBgWriter -- set or clear allocation notification latch
 *
 * If bgwprocno isn't -1, the next invocation of StrategyGetBuffer will
 * set that latch.  Pass -1 to clear the pending notification before it
 * happens.  This feature is used by the bgwriter process to wake itself up
 * from hibernation, and is not meant for anybody else to use.
 */
void
StrategyNotifyBgWriter(int bgwprocno)
{
    /*
     * We acquire buffer_strategy_lock just to ensure that the store appears
     * atomic to StrategyGetBuffer.  The bgwriter should call this rather
     * infrequently, so there's no performance penalty from being safe.
     */
    SpinLockAcquire(&StrategyControl->buffer_strategy_lock);
    StrategyControl->bgwprocno = bgwprocno;
    SpinLockRelease(&StrategyControl->buffer_strategy_lock);
}


/*
 * StrategyShmemSize
 *
 * estimate the size of shared memory used by the freelist-related structures.
 *
 * Note: for somewhat historical reasons, the buffer lookup hashtable size
 * is also determined here.
 */
Size
StrategyShmemSize(void)
{
    Size        size = 0;

    /* size of lookup hash table ... see comment in StrategyInitialize */
    size = add_size(size, BufTableShmemSize(NBuffers + NUM_BUFFER_PARTITIONS));

    /* size of the shared replacement strategy control block */
    size = add_size(size, MAXALIGN(sizeof(BufferStrategyControl)));

    return size;
}

/*
 * StrategyInitialize -- initialize the buffer cache replacement
 *        strategy.
 *
 * Assumes: All of the buffers are already built into a linked list.
 *        Only called by postmaster and only during initialization.
 */
void
StrategyInitialize(bool init)
{
    bool        found;

    /*
     * Initialize the shared buffer lookup hashtable.
     *
     * Since we can't tolerate running out of lookup table entries, we must be
     * sure to specify an adequate table size here.  The maximum steady-state
     * usage is of course NBuffers entries, but BufferAlloc() tries to insert
     * a new entry before deleting the old.  In principle this could be
     * happening in each partition concurrently, so we could need as many as
     * NBuffers + NUM_BUFFER_PARTITIONS entries.
     */
    InitBufTable(NBuffers + NUM_BUFFER_PARTITIONS);

    /*
     * Get or create the shared strategy control block
     */
    StrategyControl = (BufferStrategyControl *)
        ShmemInitStruct("Buffer Strategy Status",
                        sizeof(BufferStrategyControl),
                        &found);

    if (!found)
    {
        /*
         * Only done once, usually in postmaster
         */
        Assert(init);

        SpinLockInit(&StrategyControl->buffer_strategy_lock);

        /*
         * Grab the whole linked list of free buffers for our strategy. We
         * assume it was previously set up by InitBufferPool().
         */
        StrategyControl->firstFreeBuffer = 0;
        StrategyControl->lastFreeBuffer = NBuffers - 1;

        /* Initialize the clock sweep pointer */
        pg_atomic_init_u32(&StrategyControl->nextVictimBuffer, 0);

        /* Clear statistics */
        StrategyControl->completePasses = 0;
        pg_atomic_init_u32(&StrategyControl->numBufferAllocs, 0);

        /* No pending notification */
        StrategyControl->bgwprocno = -1;
    }
    else
        Assert(!init);
}


/* ----------------------------------------------------------------
 *                Backend-private buffer ring management
 * ----------------------------------------------------------------
 */


/*
 * GetAccessStrategy -- create a BufferAccessStrategy object
 *
 * The object is allocated in the current memory context.
 */
BufferAccessStrategy
GetAccessStrategy(BufferAccessStrategyType btype)
{// #lizard forgives
    BufferAccessStrategy strategy;
    int            ring_size;

    /*
     * Select ring size to use.  See buffer/README for rationales.
     *
     * Note: if you change the ring size for BAS_BULKREAD, see also
     * SYNC_SCAN_REPORT_INTERVAL in access/heap/syncscan.c.
     */
    switch (btype)
    {
        case BAS_NORMAL:
            /* if someone asks for NORMAL, just give 'em a "default" object */
            return NULL;

        case BAS_BULKREAD:
            ring_size = 256 * 1024 / BLCKSZ;
            break;
        case BAS_BULKWRITE:
            ring_size = 16 * 1024 * 1024 / BLCKSZ;
            break;
#ifdef __TBASE__
        case BAS_BULKREAD_STAT_SHARD:
            ring_size = 32 * 1024 * 1024 / BLCKSZ;
            break;
        case BAS_BULKWRITE_PART:
            ring_size = 8 * 1024 * 1024 / BLCKSZ;
            break;
#endif
        case BAS_VACUUM:
            ring_size = 256 * 1024 / BLCKSZ;
            break;

        default:
            elog(ERROR, "unrecognized buffer access strategy: %d",
                 (int) btype);
            return NULL;        /* keep compiler quiet */
    }

    /* Make sure ring isn't an undue fraction of shared buffers */
    ring_size = Min(NBuffers / 8, ring_size);

    /* Allocate the object and initialize all elements to zeroes */
    strategy = (BufferAccessStrategy)
        palloc0(offsetof(BufferAccessStrategyData, buffers) +
                ring_size * sizeof(Buffer));

    /* Set fields that don't start out zero */
    strategy->btype = btype;
    strategy->ring_size = ring_size;

    return strategy;
}

/*
 * FreeAccessStrategy -- release a BufferAccessStrategy object
 *
 * A simple pfree would do at the moment, but we would prefer that callers
 * don't assume that much about the representation of BufferAccessStrategy.
 */
void
FreeAccessStrategy(BufferAccessStrategy strategy)
{
    /* don't crash if called on a "default" strategy */
    if (strategy != NULL)
        pfree(strategy);
}

/*
 * GetBufferFromRing -- returns a buffer from the ring, or NULL if the
 *        ring is empty.
 *
 * The bufhdr spin lock is held on the returned buffer.
 */
static BufferDesc *
GetBufferFromRing(BufferAccessStrategy strategy, uint32 *buf_state)
{
    BufferDesc *buf;
    Buffer        bufnum;
    uint32        local_buf_state;    /* to avoid repeated (de-)referencing */


    /* Advance to next ring slot */
    if (++strategy->current >= strategy->ring_size)
        strategy->current = 0;

    /*
     * If the slot hasn't been filled yet, tell the caller to allocate a new
     * buffer with the normal allocation strategy.  He will then fill this
     * slot by calling AddBufferToRing with the new buffer.
     */
    bufnum = strategy->buffers[strategy->current];
    if (bufnum == InvalidBuffer)
    {
        strategy->current_was_in_ring = false;
        return NULL;
    }

    /*
     * If the buffer is pinned we cannot use it under any circumstances.
     *
     * If usage_count is 0 or 1 then the buffer is fair game (we expect 1,
     * since our own previous usage of the ring element would have left it
     * there, but it might've been decremented by clock sweep since then). A
     * higher usage_count indicates someone else has touched the buffer, so we
     * shouldn't re-use it.
     */
    buf = GetBufferDescriptor(bufnum - 1);
    local_buf_state = LockBufHdr(buf);
    if (BUF_STATE_GET_REFCOUNT(local_buf_state) == 0
        && BUF_STATE_GET_USAGECOUNT(local_buf_state) <= 1)
    {
        strategy->current_was_in_ring = true;
        *buf_state = local_buf_state;
        return buf;
    }
    UnlockBufHdr(buf, local_buf_state);

    /*
     * Tell caller to allocate a new buffer with the normal allocation
     * strategy.  He'll then replace this ring element via AddBufferToRing.
     */
    strategy->current_was_in_ring = false;
    return NULL;
}

/*
 * AddBufferToRing -- add a buffer to the buffer ring
 *
 * Caller must hold the buffer header spinlock on the buffer.  Since this
 * is called with the spinlock held, it had better be quite cheap.
 */
static void
AddBufferToRing(BufferAccessStrategy strategy, BufferDesc *buf)
{
    strategy->buffers[strategy->current] = BufferDescriptorGetBuffer(buf);
}

/*
 * StrategyRejectBuffer -- consider rejecting a dirty buffer
 *
 * When a nondefault strategy is used, the buffer manager calls this function
 * when it turns out that the buffer selected by StrategyGetBuffer needs to
 * be written out and doing so would require flushing WAL too.  This gives us
 * a chance to choose a different victim.
 *
 * Returns true if buffer manager should ask for a new victim, and false
 * if this buffer should be written and re-used.
 */
bool
StrategyRejectBuffer(BufferAccessStrategy strategy, BufferDesc *buf)
{
    /* We only do this in bulkread mode */
    if (strategy->btype != BAS_BULKREAD)
        return false;

    /* Don't muck with behavior of normal buffer-replacement strategy */
    if (!strategy->current_was_in_ring ||
        strategy->buffers[strategy->current] != BufferDescriptorGetBuffer(buf))
        return false;

    /*
     * Remove the dirty buffer from the ring; necessary to prevent infinite
     * loop if all ring members are dirty.
     */
    strategy->buffers[strategy->current] = InvalidBuffer;

    return true;
}

#ifdef __TBASE__
BufferAccessStrategy
GetAccessStrategy_part(int npart)
{
    BufferAccessStrategy strategy;

    strategy = GetAccessStrategy(BAS_BULKWRITE_PART);

    if(strategy->ring_size * npart > NBuffers / 8)
    {
        int align_part = TYPEALIGN(8, npart);
        strategy->ring_size = NBuffers / 8 / align_part;
    }

    return strategy;
}
#endif
